rabbitmq 消费者Consumer相关特性

前言

本篇主要介绍rabbitmq消费端Consumer的一些基本的使用特性,Consumer Cancel(取消消费者),Consumer Prefetch(设置消费者的预取数量)以及Consumer Priorities(设置消费者的优先级)等等。

Consumer Cancel

queue中的消费者,可能会因为各种原因而被取消,例如:客户端通过向broker发送basic.cancel导致消费者被取消,broker回复一个basic.cancel-ok;消费者消费的queue不可用或被删除以及queue所在节点不可用等都会导致消费者被取消,对于broker由于未知原因导致消费者被取消,客户端是无法感知的。

那么,为了解决这个问题,我们引入了一种扩展机制(即当broker发生未知原因导致消费者被取消,则发送basic.cancel通知client,除了因为节点不可用则是无法通知的),如果是客户端主动向broker发送basic.cancel取消消费者,broker也是不会向client发送basic.cancel的,而是回复basic.cancel-ok。

但是,默认情况下,AMQP 0-9-1 client并没有启用当broker发生未知原因导致消费者被取消的通知功能,我们必须在client-properties属性列表中设置consumer_cancel_notify的值为true,告知broker方可启用,但是通过阅读rabbitmq client sdk源码发现默认就是开启的,可能rabbitmq client sdk是对AMQP 0-9-1 client的进一步封装吧。

rabbitmq sdk源码

ConnectionFactory connectionFactory = new ConnectionFactory();
Map<String, Object> clientProperties = new HashMap<String, Object>();
// 替换默认的client capabilities
connectionFactory.setClientProperties(clientProperties);

# 源码
private Map<String, Object> _clientProperties = AMQConnection.defaultClientProperties();

/**
* Retrieve a copy of the default table of client properties that
* will be sent to the server during connection startup. This
* method is called when each new ConnectionFactory instance is
* constructed.
* @return a map of client properties
* @see Connection#getClientProperties
*/
public static Map<String, Object> defaultClientProperties() {
Map<String,Object> props = new HashMap<String, Object>();
props.put("product", LongStringHelper.asLongString("RabbitMQ"));
props.put("version", LongStringHelper.asLongString(ClientVersion.VERSION));
props.put("platform", LongStringHelper.asLongString("Java"));
props.put("copyright", LongStringHelper.asLongString(Copyright.COPYRIGHT));
props.put("information", LongStringHelper.asLongString(Copyright.LICENSE));
Map<String, Object> capabilities = new HashMap<String, Object>();
capabilities.put("publisher_confirms", true); //publisher_confirms默认开启的
capabilities.put("exchange_exchange_bindings", true);
capabilities.put("basic.nack", true);
capabilities.put("consumer_cancel_notify", true); //consumer_cancel_notify默认开启的
capabilities.put("connection.blocked", true);
capabilities.put("authentication_failure_close", true);
props.put("capabilities", capabilities);
return props;
}

取消通知代码

channel.queueDeclare(queue, false, true, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleCancel(String consumerTag) throws IOException {
// consumer has been cancelled unexpectedly
}
};
channel.basicConsume(queue, consumer);

client主动取消代码

channel.queueDeclare(queue, false, true, false, null);
Consumer consumer = new DefaultConsumer(channel) {
public void handleCancelOk(String consumerTag) {
// client 主动取消消费者
// broker 回复basic.cancel-ok,回调此函数
.......
}
};
channel.basicConsume(queue, consumer);

// consumerTag:当前channel内Consumer的唯一标识,由client指定,如果没有指定则交由服务端自动生成
channel.basicCancel(consumerTag);

Consumer Prefetch

Prefetch机制是用于控制未确认消息数量的一种有效方式。AMQP指定了basic.qos方法,允许我们在使用时限制通道(或连接)上未确认消息的数量(也称为“预取计数”)。但是很不幸的是,通道并不是理想的范围,因为单个通道可能从多个队列消费消息,通道和队列需要为每个发送的消息彼此协调,方可确保它们不超过此限制。这在单个机器上很慢,并且在跨集群消费时更慢。对于大多数使用场景,指定每个消费者的预取计数更加符合场景。当未确认消息数量达到Prefetch限制后,broker将不再继续向该channel或consumer发送消息,直到限制被解除。

因此RabbitMQ重新定义了basic.qos方法中全局标志的含义:
global | Meaning of prefetch_count in AMQP 0-9-1 | Meaning of prefetch_count in RabbitMQ
—|—|—
false | 限制通道channel上未确认消息数量 | 限制每个consumer上未确认消息数量
true | 限制连接connection上未确认消息数量 | 限制通道channel上未确认消息数量
connection,channel,consumer三者之间的关系:一个connection可以创建多个channel,而每个channel又可以创建多个consumer。

示例代码:

// 限制每个consumer未确认消息数量10个
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(10); // Per consumer limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

// 限制通道channel上未确认消息数量15个
Channel channel = ...;
Consumer consumer1 = ...;
Consumer consumer2 = ...;
channel.basicQos(15, true); // Per channel limit
channel.basicConsume("my-queue1", false, consumer1);
channel.basicConsume("my-queue2", false, consumer2);

ps: 大多数client-api上,global的默认值是false;

Consumer Priorities

通常情况下,queue中活跃的消费者都是循环的接收消息,但是如果是设置了优先级的消费者,则消息优先投递给优先级高的消费者(数值越大,优先级越高),只有当高优先级的消费者被阻塞或者被取消时,queue中的消息才会投递给低优先级的消费者;如果存在相同优先级的消费者,则跟通常情况下一样,相同优先级的消费者循环接收消息。

在任何情况下,消费者要么处于active活跃状态,要么处于block阻塞状态。引起消费者从active状态到block状态有很多原因,例如:channel达到basic.qos未确认消息的最大值,这时消费者就会被阻塞;或者简单的因为网络阻塞等。并且由于消费者可以每秒在active和block状态之间来回切换多次,所以在管理界面中是不会显示消费者当前状态的(因为这是非及时的,不准确的)。

示例代码

Channel channel = ...;
Consumer consumer = ...;
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-priority", 10);
channel.basicConsume("my-queue", false, args, consumer);// 创建一个优先级为10的消费者

ps:x-priority为整数值,可以是正数,也可以是负数;并且数值越大优先级越高。

参考链接

  1. http://www.rabbitmq.com/consumer-cancel.html
  2. http://www.rabbitmq.com/consumer-prefetch.html
  3. http://www.rabbitmq.com/consumer-priority.html